package com.song.elasticsearch.controller;

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.Map;

@RestController
@RequestMapping("/kafka")
public class KafkaController {
    @Autowired
    KafkaProducer<String, String> kafkaProducer;

    @GetMapping("/send/{msg}")
    public void send(@PathVariable("msg") String message){
        ProducerRecord<String, String> record = new ProducerRecord<>("demo", message);
        kafkaProducer.send(record);
    }

    @PostMapping("/send")
    public String send(@RequestBody Map<String, Object> map){
        if (map.get("topic") == null){
            return "缺少topic";
        }
        String topic = map.get("topic").toString();
        map.remove("topic");
        String message = JSONObject.toJSONString(map);
        System.out.println(message);
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
        kafkaProducer.send(record);
        return "发送成功";
    }
}
